package com.nulldev.util.internal.backport.httpclient_rw.impl.common;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;

import com.nulldev.util.internal.backport.concurrency9.concurrent.CompletableFuture;
import com.nulldev.util.internal.backport.concurrency9.concurrent.Flow;
import com.nulldev.util.internal.backport.concurrency9.concurrent.Flow.Subscriber;
import com.nulldev.util.internal.backport.httpclient_rw.backports.BackportedReference;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.SubscriberWrapper.SchedulingAction;
import com.nulldev.util.web.HttpClient.util.HttpClientUtil;

import java.io.IOException;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.IntBinaryOperator;

/**
 * Implements SSL using two SubscriberWrappers.
 *
 * <p>
 * Constructor takes two Flow.Subscribers: one that receives the network data
 * (after it has been encrypted by SSLFlowDelegate) data, and one that receives
 * the application data (before it has been encrypted by SSLFlowDelegate).
 *
 * <p>
 * Methods upstreamReader() and upstreamWriter() return the corresponding
 * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
 * See diagram below.
 *
 * <p>
 * How Flow.Subscribers are used in this class, and where they come from:
 * 
 * <pre>
 * {@code
 *
 *
 *
 * --------->  data flow direction
 *
 *
 *                         +------------------+
 *        upstreamWriter   |                  | downWriter
 *        ---------------> |                  | ------------>
 *  obtained from this     |                  | supplied to constructor
 *                         | SSLFlowDelegate  |
 *        downReader       |                  | upstreamReader
 *        <--------------- |                  | <--------------
 * supplied to constructor |                  | obtained from this
 *                         +------------------+
 *
 * Errors are reported to the downReader Flow.Subscriber
 *
 * }
 * </pre>
 */
public class SSLFlowDelegate {

	final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);

	private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
	private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
	// When handshake is in progress trying to wrap may produce no bytes.
	private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
	private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
	private static final boolean isMonitored = monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"));

	final Executor exec;
	final Reader reader;
	final Writer writer;
	final SSLEngine engine;
	final String tubeName; // hack
	final CompletableFuture<String> alpnCF; // completes on initial handshake
	final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
	volatile boolean close_notify_received;
	final CompletableFuture<Void> readerCF;
	final CompletableFuture<Void> writerCF;
	final Consumer<ByteBuffer> recycler;
	static AtomicInteger scount = new AtomicInteger(1);
	final int id;

	/**
	 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
	 * Flow.Subscriber requires an associated {@link CompletableFuture} for errors
	 * that need to be signaled from downstream to upstream.
	 */
	public SSLFlowDelegate(SSLEngine engine, Executor exec, Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter) {
		this(engine, exec, null, downReader, downWriter);
	}

	/**
	 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
	 * Flow.Subscriber requires an associated {@link CompletableFuture} for errors
	 * that need to be signaled from downstream to upstream.
	 */
	public SSLFlowDelegate(SSLEngine engine, Executor exec, Consumer<ByteBuffer> recycler, Subscriber<? super List<ByteBuffer>> downReader,
			Subscriber<? super List<ByteBuffer>> downWriter) {
		this.id = scount.getAndIncrement();
		this.tubeName = String.valueOf(downWriter);
		this.recycler = recycler;
		this.reader = new Reader();
		this.writer = new Writer();
		this.engine = engine;
		this.exec = exec;
		this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
		this.readerCF = reader.completion();
		this.writerCF = reader.completion();
		readerCF.exceptionally(this::stopOnError);
		writerCF.exceptionally(this::stopOnError);

		CompletableFuture.allOf(reader.completion(), writer.completion()).thenRun(this::normalStop);
		this.alpnCF = new MinimalFuture<>();

		// connect the Reader to the downReader and the
		// Writer to the downWriter.
		connect(downReader, downWriter);

		if (isMonitored)
			Monitor.add(monitor);
	}

	/**
	 * Returns true if the SSLFlowDelegate has detected a TLS close_notify from the
	 * server.
	 * 
	 * @return true, if a close_notify was detected.
	 */
	public boolean closeNotifyReceived() {
		return close_notify_received;
	}

	/**
	 * Connects the read sink (downReader) to the SSLFlowDelegate Reader, and the
	 * write sink (downWriter) to the SSLFlowDelegate Writer. Called from within the
	 * constructor. Overwritten by SSLTube.
	 *
	 * @param downReader The left hand side read sink (typically, the HttpConnection
	 *                   read subscriber).
	 * @param downWriter The right hand side write sink (typically the SocketTube
	 *                   write subscriber).
	 */
	void connect(Subscriber<? super List<ByteBuffer>> downReader, Subscriber<? super List<ByteBuffer>> downWriter) {
		this.reader.subscribe(downReader);
		this.writer.subscribe(downWriter);
	}

	/**
	 * Returns a CompletableFuture<String> which completes after the initial
	 * handshake completes, and which contains the negotiated alpn.
	 */
	public CompletableFuture<String> alpn() {
		return alpnCF;
	}

	private void setALPN() {
		// Handshake is finished. So, can retrieve the ALPN now
		if (alpnCF.isDone())
			return;
		if (!HttpClientUtil.hasALPN()) {
			if (debug.on())
				debug.log("setALPN (missing!) = null");
			alpnCF.complete(null);
			return;
		}
		try {
			String alpn = engine.getApplicationProtocol();
			if (debug.on())
				debug.log("setALPN = %s", alpn);
			alpnCF.complete(alpn);
		} catch (Throwable tx) {
			Log.logError(tx);
		}
	}

	public String monitor() {
		StringBuilder sb = new StringBuilder();
		sb.append("SSL: id ").append(id);
		sb.append(" ").append(dbgString());
		sb.append(" HS state: " + states(handshakeState));
		sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
		if (stateList != null) {
			sb.append(" LL : ");
			for (String s : stateList) {
				sb.append(s).append(" ");
			}
		}
		sb.append("\r\n");
		sb.append("Reader:: ").append(reader.toString());
		sb.append("\r\n");
		sb.append("Writer:: ").append(writer.toString());
		sb.append("\r\n===================================");
		return sb.toString();
	}

	protected SchedulingAction enterReadScheduling() {
		return SchedulingAction.CONTINUE;
	}

	/**
	 * Processing function for incoming data. Pass it thru SSLEngine.unwrap(). Any
	 * decrypted buffers returned to be passed downstream. Status codes:
	 * NEED_UNWRAP: do nothing. Following incoming data will contain any required
	 * handshake data NEED_WRAP: call writer.addData() with empty buffer NEED_TASK:
	 * delegate task to executor BUFFER_OVERFLOW: allocate larger output buffer.
	 * Repeat unwrap BUFFER_UNDERFLOW: keep buffer and wait for more data OK: return
	 * generated buffers.
	 *
	 * Upstream subscription strategy is to try and keep no more than TARGET_BUFSIZE
	 * bytes in readBuf
	 */
	final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
		// Maximum record size is 16k.
		// Because SocketTube can feeds us up to 3 16K buffers,
		// then setting this size to 16K means that the readBuf
		// can store up to 64K-1 (16K-1 + 3*16K)
		static final int TARGET_BUFSIZE = 16 * 1024;

		final SequentialScheduler scheduler;
		volatile ByteBuffer readBuf;
		volatile boolean completing;
		final Object readBufferLock = new Object();
		final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);

		private final class ReaderDownstreamPusher implements Runnable {
			@Override
			public void run() {
				processData();
			}
		}

		Reader() {
			super();
			scheduler = SequentialScheduler.synchronizedScheduler(new ReaderDownstreamPusher());
			this.readBuf = ByteBuffer.allocate(1024);
			readBuf.limit(0); // keep in read mode
		}

		@Override
		public boolean supportsRecycling() {
			return recycler != null;
		}

		protected SchedulingAction enterScheduling() {
			return enterReadScheduling();
		}

		public final String dbgString() {
			return "SSL Reader(" + tubeName + ")";
		}

		/**
		 * entry point for buffers delivered from upstream Subscriber
		 */
		@Override
		public void incoming(List<ByteBuffer> buffers, boolean complete) {
			if (debugr.on())
				debugr.log("Adding %d bytes to read buffer", Utils.remaining(buffers));
			addToReadBuf(buffers, complete);
			scheduler.runOrSchedule(exec);
		}

		@Override
		public String toString() {
			return "READER: " + super.toString() + ", readBuf: " + readBuf.toString() + ", count: " + count.toString() + ", scheduler: "
					+ (scheduler.isStopped() ? "stopped" : "running") + ", status: " + lastUnwrapStatus;
		}

		private void reallocReadBuf() {
			int sz = readBuf.capacity();
			ByteBuffer newb = ByteBuffer.allocate(sz * 2);
			readBuf.flip();
			Utils.copy(readBuf, newb);
			readBuf = newb;
		}

		@Override
		protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
			if (readBuf.remaining() > TARGET_BUFSIZE) {
				if (debugr.on())
					debugr.log("readBuf has more than TARGET_BUFSIZE: %d", readBuf.remaining());
				return 0;
			} else {
				return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
			}
		}

		// readBuf is kept ready for reading outside of this method
		private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
			assert Utils.remaining(buffers) > 0 || buffers.isEmpty();
			synchronized (readBufferLock) {
				for (ByteBuffer buf : buffers) {
					readBuf.compact();
					while (readBuf.remaining() < buf.remaining())
						reallocReadBuf();
					readBuf.put(buf);
					readBuf.flip();
					// should be safe to call inside lock
					// since the only implementation
					// offers the buffer to an unbounded queue.
					// WARNING: do not touch buf after this point!
					if (recycler != null)
						recycler.accept(buf);
				}
				if (complete) {
					this.completing = complete;
					minBytesRequired = 0;
				}
			}
		}

		void schedule() {
			scheduler.runOrSchedule(exec);
		}

		void stop() {
			if (debugr.on())
				debugr.log("stop");
			scheduler.stop();
		}

		AtomicInteger count = new AtomicInteger(0);

		// minimum number of bytes required to call unwrap.
		// Usually this is 0, unless there was a buffer underflow.
		// In this case we need to wait for more bytes than what
		// we had before calling unwrap() again.
		volatile int minBytesRequired;

		// work function where it all happens
		final void processData() {
			try {
				if (debugr.on())
					debugr.log("processData:" + " readBuf remaining:" + readBuf.remaining() + ", state:" + states(handshakeState) + ", engine handshake status:"
							+ engine.getHandshakeStatus());
				int len;
				boolean complete = false;
				while (readBuf.remaining() > (len = minBytesRequired)) {
					boolean handshaking = false;
					try {
						EngineResult result;
						synchronized (readBufferLock) {
							complete = this.completing;
							if (debugr.on())
								debugr.log("Unwrapping: %s", readBuf.remaining());
							// Unless there is a BUFFER_UNDERFLOW, we should try to
							// unwrap any number of bytes. Set minBytesRequired to 0:
							// we only need to do that if minBytesRequired is not already 0.
							len = len > 0 ? minBytesRequired = 0 : len;
							result = unwrapBuffer(readBuf);
							len = readBuf.remaining();
							if (debugr.on()) {
								debugr.log("Unwrapped: result: %s", result.result);
								debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
							}
						}
						if (result.bytesProduced() > 0) {
							if (debugr.on())
								debugr.log("sending %d", result.bytesProduced());
							count.addAndGet(result.bytesProduced());
							outgoing(result.destBuffer, false);
						}
						if (result.status() == Status.BUFFER_UNDERFLOW) {
							if (debugr.on())
								debugr.log("BUFFER_UNDERFLOW");
							// not enough data in the read buffer...
							// no need to try to unwrap again unless we get more bytes
							// than minBytesRequired = len in the read buffer.
							synchronized (readBufferLock) {
								minBytesRequired = len;
								// more bytes could already have been added...
								assert readBuf.remaining() >= len;
								// check if we have received some data, and if so
								// we can just re-spin the loop
								if (readBuf.remaining() > len)
									continue;
								else if (this.completing) {
									if (debug.on()) {
										debugr.log("BUFFER_UNDERFLOW with EOF," + " %d bytes non decrypted.", len);
									}
									// The channel won't send us any more data, and
									// we are in underflow: we need to fail.
									throw new IOException("BUFFER_UNDERFLOW with EOF, " + len + " bytes non decrypted.");
								}
							}
							// request more data and return.
							requestMore();
							return;
						}
						if (complete && result.status() == Status.CLOSED) {
							if (debugr.on())
								debugr.log("Closed: completing");
							outgoing(Utils.EMPTY_BB_LIST, true);
							return;
						}
						if (result.handshaking()) {
							handshaking = true;
							if (debugr.on())
								debugr.log("handshaking");
							if (doHandshake(result, READER))
								continue; // need unwrap
							else
								break; // doHandshake will have triggered the write scheduler if necessary
						} else {
							if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
								handshaking = false;
								applicationBufferSize = engine.getSession().getApplicationBufferSize();
								packetBufferSize = engine.getSession().getPacketBufferSize();
								setALPN();
								resumeActivity();
							}
						}
					} catch (IOException ex) {
						errorCommon(ex);
						handleError(ex);
						return;
					}
					if (handshaking && !complete)
						return;
				}
				if (!complete) {
					synchronized (readBufferLock) {
						complete = this.completing && !readBuf.hasRemaining();
					}
				}
				if (complete) {
					if (debugr.on())
						debugr.log("completing");
					// Complete the alpnCF, if not already complete, regardless of
					// whether or not the ALPN is available, there will be no more
					// activity.
					setALPN();
					outgoing(Utils.EMPTY_BB_LIST, true);
				}
			} catch (Throwable ex) {
				errorCommon(ex);
				handleError(ex);
			}
		}

		private volatile Status lastUnwrapStatus;

		EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
			ByteBuffer dst = getAppBuffer();
			int len = src.remaining();
			while (true) {
				SSLEngineResult sslResult = engine.unwrap(src, dst);
				switch (lastUnwrapStatus = sslResult.getStatus()) {
					case BUFFER_OVERFLOW:
						// may happen if app size buffer was changed, or if
						// our 'adaptiveBufferSize' guess was too small for
						// the current payload. In that case, update the
						// value of applicationBufferSize, and allocate a
						// buffer of that size, which we are sure will be
						// big enough to decode whatever needs to be
						// decoded. We will later update adaptiveBufferSize
						// in OK: below.
						int appSize = applicationBufferSize = engine.getSession().getApplicationBufferSize();
						ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
						dst.flip();
						b.put(dst);
						dst = b;
						break;
					case CLOSED:
						assert dst.position() == 0;
						return doClosure(new EngineResult(sslResult));
					case BUFFER_UNDERFLOW:
						// handled implicitly by compaction/reallocation of readBuf
						assert dst.position() == 0;
						return new EngineResult(sslResult);
					case OK:
						int size = dst.position();
						if (debug.on()) {
							debugr.log("Decoded " + size + " bytes out of " + len + " into buffer of " + dst.capacity() + " remaining to decode: "
									+ src.remaining());
						}
						// if the record payload was bigger than what was originally
						// allocated, then sets the adaptiveAppBufferSize to size
						// and we will use that new size as a guess for the next app
						// buffer.
						if (size > adaptiveAppBufferSize) {
							adaptiveAppBufferSize = ((size + 7) >>> 3) << 3;
						}
						dst.flip();
						return new EngineResult(sslResult, dst);
				}
			}
		}
	}

	public interface Monitorable {
		public String getInfo();
	}

	public static class Monitor extends Thread {
		final List<WeakReference<Monitorable>> list;
		final List<FinalMonitorable> finalList;
		final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>();
		static Monitor themon;

		static {
			themon = new Monitor();
			themon.start(); // uncomment to enable Monitor
		}

		// An instance used to temporarily store the
		// last observable state of a monitorable object.
		// When Monitor.remove(o) is called, we replace
		// 'o' with a FinalMonitorable whose reference
		// will be enqueued after the last observable state
		// has been printed.
		final class FinalMonitorable implements Monitorable {
			final String finalState;

			FinalMonitorable(Monitorable o) {
				finalState = o.getInfo();
				finalList.add(this);
			}

			@Override
			public String getInfo() {
				finalList.remove(this);
				return finalState;
			}
		}

		Monitor() {
			super("Monitor");
			setDaemon(true);
			list = Collections.synchronizedList(new LinkedList<>());
			finalList = new ArrayList<>(); // access is synchronized on list above
		}

		void addTarget(Monitorable o) {
			list.add(new WeakReference<>(o, queue));
		}

		void removeTarget(Monitorable o) {
			// It can take a long time for GC to clean up references.
			// Calling Monitor.remove() early helps removing noise from the
			// logs/
			synchronized (list) {
				Iterator<WeakReference<Monitorable>> it = list.iterator();
				while (it.hasNext()) {
					Monitorable m = it.next().get();
					if (m == null)
						it.remove();
					if (o == m) {
						it.remove();
						break;
					}
				}
				FinalMonitorable m = new FinalMonitorable(o);
				addTarget(m);
				BackportedReference.reachabilityFence(m);
			}
		}

		public static void add(Monitorable o) {
			themon.addTarget(o);
		}

		public static void remove(Monitorable o) {
			themon.removeTarget(o);
		}

		@Override
		public void run() {
			System.out.println("Monitor starting");
			try {
				while (true) {
					Thread.sleep(20 * 1000);
					synchronized (list) {
						Reference<? extends Monitorable> expired;
						while ((expired = queue.poll()) != null)
							list.remove(expired);
						for (WeakReference<Monitorable> ref : list) {
							Monitorable o = ref.get();
							if (o == null)
								continue;
							if (o instanceof FinalMonitorable) {
								ref.enqueue();
							}
							System.out.println(o.getInfo());
							System.out.println("-------------------------");
						}
					}
					System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
				}
			} catch (InterruptedException e) {
				System.out.println("Monitor exiting with " + e);
			}
		}
	}

	/**
	 * Processing function for outgoing data. Pass it thru SSLEngine.wrap() Any
	 * encrypted buffers generated are passed downstream to be written. Status
	 * codes: NEED_UNWRAP: call reader.addData() with empty buffer NEED_WRAP: call
	 * addData() with empty buffer NEED_TASK: delegate task to executor
	 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap BUFFER_UNDERFLOW:
	 * shouldn't happen on writing side OK: return generated buffers
	 */
	class Writer extends SubscriberWrapper {
		final SequentialScheduler scheduler;
		// queues of buffers received from upstream waiting
		// to be processed by the SSLEngine
		final List<ByteBuffer> writeList;
		final Logger debugw = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
		volatile boolean completing;
		boolean completed; // only accessed in processData

		class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
			@Override
			public void run() {
				processData();
			}
		}

		Writer() {
			super();
			writeList = Collections.synchronizedList(new LinkedList<>());
			scheduler = new SequentialScheduler(new WriterDownstreamPusher());
		}

		@Override
		protected void incoming(List<ByteBuffer> buffers, boolean complete) {
			assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
			assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
			if (complete) {
				if (debugw.on())
					debugw.log("adding SENTINEL");
				completing = true;
				writeList.add(SENTINEL);
			} else {
				writeList.addAll(buffers);
			}
			if (debugw.on())
				debugw.log("added " + buffers.size() + " (" + Utils.remaining(buffers) + " bytes) to the writeList");
			scheduler.runOrSchedule();
		}

		public final String dbgString() {
			return "SSL Writer(" + tubeName + ")";
		}

		protected void onSubscribe() {
			if (debugw.on())
				debugw.log("onSubscribe initiating handshaking");
			addData(HS_TRIGGER); // initiates handshaking
		}

		void schedule() {
			scheduler.runOrSchedule();
		}

		void stop() {
			if (debugw.on())
				debugw.log("stop");
			scheduler.stop();
		}

		@Override
		public boolean closing() {
			return closeNotifyReceived();
		}

		private boolean isCompleting() {
			return completing;
		}

		@Override
		protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
			if (writeList.size() > 10)
				return 0;
			else
				return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
		}

		private boolean hsTriggered() {
			synchronized (writeList) {
				for (ByteBuffer b : writeList)
					if (b == HS_TRIGGER)
						return true;
				return false;
			}
		}

		void triggerWrite() {
			synchronized (writeList) {
				if (writeList.isEmpty()) {
					writeList.add(HS_TRIGGER);
				}
			}
			scheduler.runOrSchedule();
		}

		private void processData() {
			boolean completing = isCompleting();

			try {
				if (debugw.on())
					debugw.log(
							"processData, writeList remaining:" + Utils.remaining(writeList) + ", hsTriggered:" + hsTriggered() + ", needWrap:" + needWrap());

				while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
					ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
					EngineResult result = wrapBuffers(outbufs);
					if (debugw.on())
						debugw.log("wrapBuffer returned %s", result.result);

					if (result.status() == Status.CLOSED) {
						if (!upstreamCompleted) {
							upstreamCompleted = true;
							upstreamSubscription.cancel();
						}
						if (result.bytesProduced() <= 0)
							return;

						if (!completing && !completed) {
							completing = this.completing = true;
							// There could still be some outgoing data in outbufs.
							writeList.add(SENTINEL);
						}
					}

					boolean handshaking = false;
					if (result.handshaking()) {
						if (debugw.on())
							debugw.log("handshaking");
						doHandshake(result, WRITER); // ok to ignore return
						handshaking = true;
					} else {
						if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
							applicationBufferSize = engine.getSession().getApplicationBufferSize();
							packetBufferSize = engine.getSession().getPacketBufferSize();
							setALPN();
							resumeActivity();
						}
					}
					cleanList(writeList); // tidy up the source list
					sendResultBytes(result);
					if (handshaking) {
						if (!completing && needWrap()) {
							continue;
						} else {
							return;
						}
					}
				}
				if (completing && Utils.remaining(writeList) == 0) {
					if (!completed) {
						completed = true;
						writeList.clear();
						outgoing(Utils.EMPTY_BB_LIST, true);
					}
					return;
				}
				if (writeList.isEmpty() && needWrap()) {
					writer.addData(HS_TRIGGER);
				}
			} catch (Throwable ex) {
				errorCommon(ex);
				handleError(ex);
			}
		}

		// The SSLEngine insists on being given a buffer that is at least
		// SSLSession.getPacketBufferSize() long (usually 16K). If given
		// a smaller buffer it will go in BUFFER_OVERFLOW, even if it only
		// has 6 bytes to wrap. Typical usage shows that for GET we
		// usually produce an average of ~ 100 bytes.
		// To avoid wasting space, and because allocating and zeroing
		// 16K buffers for encoding 6 bytes is costly, we are reusing the
		// same writeBuffer to interact with SSLEngine.wrap().
		// If the SSLEngine produces less than writeBuffer.capacity() / 2,
		// then we copy off the bytes to a smaller buffer that we send
		// downstream. Otherwise, we send the writeBuffer downstream
		// and will allocate a new one next time.
		volatile ByteBuffer writeBuffer;
		private volatile Status lastWrappedStatus;

		EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
			long len = Utils.remaining(src);
			if (debugw.on())
				debugw.log("wrapping " + len + " bytes");

			ByteBuffer dst = writeBuffer;
			if (dst == null)
				dst = writeBuffer = getNetBuffer();
			assert dst.position() == 0 : "buffer position is " + dst.position();
			assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();

			while (true) {
				SSLEngineResult sslResult = engine.wrap(src, dst);
				if (debugw.on())
					debugw.log("SSLResult: " + sslResult);
				switch (lastWrappedStatus = sslResult.getStatus()) {
					case BUFFER_OVERFLOW:
						// Shouldn't happen. We allocated buffer with packet size
						// get it again if net buffer size was changed
						if (debugw.on())
							debugw.log("BUFFER_OVERFLOW");
						int netSize = packetBufferSize = engine.getSession().getPacketBufferSize();
						ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position());
						dst.flip();
						b.put(dst);
						dst = b;
						break; // try again
					case CLOSED:
						if (debugw.on())
							debugw.log("CLOSED");
						// fallthrough. There could be some remaining data in dst.
						// CLOSED will be handled by the caller.
					case OK:
						final ByteBuffer dest;
						if (dst.position() == 0) {
							dest = NOTHING; // can happen if handshake is in progress
						} else if (dst.position() < dst.capacity() / 2) {
							// less than half the buffer was used.
							// copy off the bytes to a smaller buffer, and keep
							// the writeBuffer for next time.
							dst.flip();
							dest = Utils.copyAligned(dst);
							dst.clear();
						} else {
							// more than half the buffer was used.
							// just send that buffer downstream, and we will
							// get a new writeBuffer next time it is needed.
							dst.flip();
							dest = dst;
							writeBuffer = null;
						}
						if (debugw.on())
							debugw.log("OK => produced: %d bytes into %d, not wrapped: %d", dest.remaining(), dest.capacity(), Utils.remaining(src));
						return new EngineResult(sslResult, dest);
					case BUFFER_UNDERFLOW:
						// Shouldn't happen. Doesn't returns when wrap()
						// underflow handled externally
						// assert false : "Buffer Underflow";
						if (debug.on())
							debug.log("BUFFER_UNDERFLOW");
						return new EngineResult(sslResult);
					default:
						if (debugw.on())
							debugw.log("result: %s", sslResult.getStatus());
						assert false : "result:" + sslResult.getStatus();
				}
			}
		}

		private boolean needWrap() {
			return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
		}

		private void sendResultBytes(EngineResult result) {
			if (result.bytesProduced() > 0) {
				if (debugw.on())
					debugw.log("Sending %d bytes downstream", result.bytesProduced());
				outgoing(result.destBuffer, false);
			}
		}

		@Override
		public String toString() {
			return "WRITER: " + super.toString() + ", writeList size: " + Integer.toString(writeList.size()) + ", scheduler: "
					+ (scheduler.isStopped() ? "stopped" : "running") + ", status: " + lastWrappedStatus;
			// " writeList: " + writeList.toString();
		}
	}

	private void handleError(Throwable t) {
		if (debug.on())
			debug.log("handleError", t);
		readerCF.completeExceptionally(t);
		writerCF.completeExceptionally(t);
		// no-op if already completed
		alpnCF.completeExceptionally(t);
		reader.stop();
		writer.stop();
	}

	boolean stopped;

	private synchronized void normalStop() {
		if (stopped)
			return;
		stopped = true;
		reader.stop();
		writer.stop();
		if (isMonitored)
			Monitor.remove(monitor);
	}

	private Void stopOnError(Throwable currentlyUnused) {
		// maybe log, etc
		normalStop();
		return null;
	}

	private void cleanList(List<ByteBuffer> l) {
		synchronized (l) {
			Iterator<ByteBuffer> iter = l.iterator();
			while (iter.hasNext()) {
				ByteBuffer b = iter.next();
				if (!b.hasRemaining() && b != SENTINEL) {
					iter.remove();
				}
			}
		}
	}

	/**
	 * States for handshake. We avoid races when accessing/updating the AtomicInt
	 * because updates always schedule an additional call to both the read() and
	 * write() functions.
	 */
	private static final int NOT_HANDSHAKING = 0;
	private static final int HANDSHAKING = 1;

	// Bit flags
	// a thread is currently executing tasks
	private static final int DOING_TASKS = 4;
	// a thread wants to execute tasks, while another thread is executing
	private static final int REQUESTING_TASKS = 8;
	private static final int TASK_BITS = 12; // Both bits

	private static final int READER = 1;
	private static final int WRITER = 2;

	private static String states(AtomicInteger state) {
		int s = state.get();
		StringBuilder sb = new StringBuilder();
		int x = s & ~TASK_BITS;
		switch (x) {
			case NOT_HANDSHAKING:
				sb.append(" NOT_HANDSHAKING ");
				break;
			case HANDSHAKING:
				sb.append(" HANDSHAKING ");
				break;
			default:
				throw new InternalError();
		}
		if ((s & DOING_TASKS) > 0)
			sb.append("|DOING_TASKS");
		if ((s & REQUESTING_TASKS) > 0)
			sb.append("|REQUESTING_TASKS");
		return sb.toString();
	}

	private void resumeActivity() {
		reader.schedule();
		writer.schedule();
	}

	final AtomicInteger handshakeState;
	final ConcurrentLinkedQueue<String> stateList = debug.on() ? new ConcurrentLinkedQueue<>() : null;

	// Atomically executed to update task bits. Sets either DOING_TASKS or
	// REQUESTING_TASKS
	// depending on previous value
	private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
		if ((current & DOING_TASKS) == 0)
			return DOING_TASKS | (current & HANDSHAKING);
		else
			return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
	};

	// Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was
	// set
	// clears bits if not.
	private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
		if ((current & REQUESTING_TASKS) != 0)
			return DOING_TASKS | (current & HANDSHAKING);
		// clear both bits
		return (current & HANDSHAKING);
	};

	private boolean doHandshake(EngineResult r, int caller) {
		// unconditionally sets the HANDSHAKING bit, while preserving task bits
		handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
		if (stateList != null && debug.on()) {
			stateList.add(r.handshakeStatus().toString());
			stateList.add(Integer.toString(caller));
		}
		final String name = r.handshakeStatus().name();
		if (name.equalsIgnoreCase("NEED_TASK")) {
			int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
			if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
				return false;
			}

			if (debug.on())
				debug.log("obtaining and initiating task execution");
			List<Runnable> tasks = obtainTasks();
			executeTasks(tasks);
			return false; // executeTasks will resume activity
		} else if (name.equalsIgnoreCase("NEED_WRAP")) {
			if (caller == READER) {
				writer.triggerWrite();
				return false;
			}
		} else if (name.equalsIgnoreCase("NEED_UNWRAP_AGAIN") || name.equalsIgnoreCase("NEED_UNWRAP")) {
			// do nothing else
			// receiving-side data will trigger unwrap
			if (caller == WRITER) {
				reader.schedule();
				return false;
			}
		} else {
			throw new InternalError("Unexpected handshake status:" + r.handshakeStatus());
		}
		return true;
	}

	private List<Runnable> obtainTasks() {
		List<Runnable> l = new ArrayList<>();
		Runnable r;
		while ((r = engine.getDelegatedTask()) != null) {
			l.add(r);
		}
		return l;
	}

	private void executeTasks(List<Runnable> tasks) {
		exec.execute(() -> {
			try {
				List<Runnable> nextTasks = tasks;
				if (debug.on())
					debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
				do {
					nextTasks.forEach(Runnable::run);
					if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
						nextTasks = obtainTasks();
					} else {
						int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
						if ((s & DOING_TASKS) != 0) {
							if (debug.on())
								debug.log("re-running tasks (B)");
							nextTasks = obtainTasks();
							continue;
						}
						break;
					}
				} while (true);
				if (debug.on())
					debug.log("finished task execution");
				resumeActivity();
			} catch (Throwable t) {
				handleError(t);
			}
		});
	}

	// FIXME: acknowledge a received CLOSE request from peer
	EngineResult doClosure(EngineResult r) throws IOException {
		if (debug.on())
			debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]", r.result, engine.getHandshakeStatus(), engine.isOutboundDone(),
					engine.isInboundDone());
		if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
			// we have received TLS close_notify and need to send
			// an acknowledgement back. We're calling doHandshake
			// to finish the close handshake.
			if (engine.isInboundDone() && !engine.isOutboundDone()) {
				if (debug.on())
					debug.log("doClosure: close_notify received");
				close_notify_received = true;
				if (!writer.scheduler.isStopped()) {
					doHandshake(r, READER);
				} else {
					// We have received closed notify, but we
					// won't be able to send the acknowledgement.
					// Nothing more will come from the socket either,
					// so mark the reader as completed.
					synchronized (reader.readBufferLock) {
						reader.completing = true;
					}
				}
			}
		}
		return r;
	}

	/**
	 * Returns the upstream Flow.Subscriber of the reading (incoming) side. This
	 * flow must be given the encrypted data read from upstream (eg socket) before
	 * it is decrypted.
	 */
	public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
		return reader;
	}

	/**
	 * Returns the upstream Flow.Subscriber of the writing (outgoing) side. This
	 * flow contains the plaintext data before it is encrypted.
	 */
	public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
		return writer;
	}

	public boolean resumeReader() {
		return reader.signalScheduling();
	}

	public void resetReaderDemand() {
		reader.resetDownstreamDemand();
	}

	static class EngineResult {
		final SSLEngineResult result;
		final ByteBuffer destBuffer;

		// normal result
		EngineResult(SSLEngineResult result) {
			this(result, null);
		}

		EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
			this.result = result;
			this.destBuffer = destBuffer;
		}

		boolean handshaking() {
			HandshakeStatus s = result.getHandshakeStatus();
			return s != HandshakeStatus.FINISHED && s != HandshakeStatus.NOT_HANDSHAKING && result.getStatus() != Status.CLOSED;
		}

		boolean needUnwrap() {
			HandshakeStatus s = result.getHandshakeStatus();
			return s == HandshakeStatus.NEED_UNWRAP;
		}

		int bytesConsumed() {
			return result.bytesConsumed();
		}

		int bytesProduced() {
			return result.bytesProduced();
		}

		SSLEngineResult.HandshakeStatus handshakeStatus() {
			return result.getHandshakeStatus();
		}

		SSLEngineResult.Status status() {
			return result.getStatus();
		}
	}

	// The maximum network buffer size negotiated during
	// the handshake. Usually 16K.
	volatile int packetBufferSize;

	final ByteBuffer getNetBuffer() {
		int netSize = packetBufferSize;
		if (netSize <= 0) {
			packetBufferSize = netSize = engine.getSession().getPacketBufferSize();
		}
		return ByteBuffer.allocate(netSize);
	}

	// The maximum application buffer size negotiated during
	// the handshake. Usually close to 16K.
	volatile int applicationBufferSize;
	// Despite of the maximum applicationBufferSize negotiated
	// above, TLS records usually have a much smaller payload.
	// The adaptativeAppBufferSize records the max payload
	// ever decoded, and we use that as a guess for how big
	// a buffer we will need for the next payload.
	// This avoids allocating and zeroing a 16K buffer for
	// nothing...
	volatile int adaptiveAppBufferSize;

	final ByteBuffer getAppBuffer() {
		int appSize = applicationBufferSize;
		if (appSize <= 0) {
			applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize();
		}
		int size = adaptiveAppBufferSize;
		if (size <= 0) {
			size = 512; // start with 512 this is usually enough for handshaking / headers
		} else if (size > appSize) {
			size = appSize;
		}
		// will cause a BUFFER_OVERFLOW if not big enough, but
		// that's OK.
		return ByteBuffer.allocate(size);
	}

	final String dbgString() {
		return "SSLFlowDelegate(" + tubeName + ")";
	}
}